package org.zjvis.datascience.jobserver.service;

import java.io.DataInputStream;
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Exchanger;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.PostConstruct;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.spark.launcher.SparkAppHandle;
import org.apache.spark.launcher.SparkLauncher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestClientException;
import org.zjvis.datascience.common.util.RestTemplateUtil;
import org.zjvis.datascience.common.util.TimeUtil;
import org.zjvis.datascience.jobserver.common.util.SparkJobServerRestUtil;
import org.zjvis.datascience.jobserver.service.hdfs.FileSystemHolder;
import org.zjvis.datascience.jobserver.service.yarn.SparkLauncherConfig;
import org.zjvis.datascience.jobserver.service.yarn.YarnClientHolder;
import com.alibaba.fastjson.JSONObject;
import com.google.common.base.Joiner;
import cn.hutool.core.text.UnicodeUtil;

/**
 * @description spark任务转发Service
 * @date 2021-12-28
 */
@Service
public class JobService {

    private final static Logger logger = LoggerFactory.getLogger("JobService");

    @Value("${yarn.home}")
    private String yarnHome;

    @Autowired
    private YarnClientHolder yarnClientHolder;

    @Autowired
    private SparkLauncherConfig sparkLauncherConfig;

    @Autowired
    private SparkJobServerRestUtil sparkJobServerRestUtil;

    @Autowired
    private RestTemplateUtil restTemplateUtil;

    @Autowired
    private FileSystemHolder fileSystemHolder;

    public static final String RUN_META_PATH = "/zjvis/runtimeMeta/%s/part-00000";

    private static final int CONTEXT_NUM = 1;

    private static final BlockingQueue<String> contextPool = new LinkedBlockingQueue<>(CONTEXT_NUM);

    /**
     * 系统启动时创建jobServer上下文
     */
    @PostConstruct
    public void initContext() {
        if (restTemplateUtil.getLocalFlag()) {
            for (int i = 0; i < CONTEXT_NUM; i++) {
                new Thread(() -> {
                    String name = "datascience-context" + Thread.currentThread().getName();
                    String info = deleteContext(name);
                    logger.info(
                            "JobService.initContext(),deleteContext success contextName = {}, deleteInfo = {}",
                            name, info);
                    if (StringUtils.isNotEmpty(createContext(name))) {
                        contextPool.add(name);
                    }
                }, String.valueOf(i)).start();
            }
            logger.info("JobService.initContext success. contextPool = {}", contextPool);
        }else {
            logger.info("There is no for JobService to create spark context, since you are using a external jobserver");
        }
    }


    public ApplicationReport queryJobStatus(String applicationIdStr, String directory)
        throws IOException, YarnException {
        YarnClient yarnClient = yarnClientHolder.getYarnClient();
        ApplicationId applicationId = covertApplicationId(applicationIdStr);
        ApplicationReport report = yarnClient.getApplicationReport(applicationId);
        //运行成功的算子会输出元数据,查出来返回
        //由算子直接写入mysql,暂不需要。先保留
//    if ("SUCCEEDED".equals(report.getFinalApplicationStatus().toString())) {
//      String runtimeMeta = queryRuntimeMeta(directory);
//      report.setDiagnostics(runtimeMeta);
//      logger.info("Job SUCCEED, runtimeMeta={}", runtimeMeta);
//    }
        return report;
    }

    public void killApplication(String applicationIdStr) throws IOException, YarnException {
        YarnClient yarnClient = yarnClientHolder.getYarnClient();
        yarnClient.killApplication(covertApplicationId(applicationIdStr));
    }

    public String startApplication(String appArgs) throws IOException {
        Exchanger<String> exchanger = new Exchanger<>();
        String applicationId = "";
        AtomicInteger flag = new AtomicInteger(1);
        String[] argArray = appArgs.split(" ");
        for (int i = 0; i < argArray.length; i++) {
            argArray[i] = UnicodeUtil.toUnicode(argArray[i]);
        }
        SparkAppHandle handler = new SparkLauncher()
            .setAppName(argArray[0])
            .setSparkHome(sparkLauncherConfig.getSparkHome())
            .setMaster(sparkLauncherConfig.getMaster())
            .setAppResource(sparkLauncherConfig.getAppResource())
            .setMainClass(sparkLauncherConfig.getMainClass())
            .setConf("spark.driver.memory", sparkLauncherConfig.getDriverMem())
            .setConf("spark.executor.memory", sparkLauncherConfig.getExecutorMemory())
            .setConf("spark.executor.cores", sparkLauncherConfig.getExecutorCores())
            .addAppArgs(argArray)
            .setDeployMode(sparkLauncherConfig.getDeployMode())
            .startApplication(new SparkAppHandle.Listener() {
                @Override
                public void stateChanged(SparkAppHandle handle) {
                    if (flag.get() == 1) {
                        if (StringUtils.isNotEmpty(handle.getAppId())) {
                            try {
                                exchanger.exchange(handle.getAppId(), 300, TimeUnit.MILLISECONDS);
                                flag.set(0);
                            } catch (Exception e) {
                                logger.warn("JobService -> startApplication failed, since {}", e.getMessage());
                            }
                        }
                    } else {
                        if (handle.getState().isFinal()) {
                            callbackDataScience(handle.getAppId(), appArgs);
                        }
                    }
                }
                @Override
                public void infoChanged(SparkAppHandle handle) {
                }
            });

        try {
            applicationId = exchanger.exchange("");
        } catch (InterruptedException e) {
            logger.warn("JobService -> startApplication failed, since {}", e.getMessage());
        }
        logger.info("submit job success, applicationId={}", applicationId);
        return applicationId;
    }

    /**
     * 提交pyspark任务
     *
     * @param appArgs         命令行参数
     * @param appResourcePath pyspark脚本全路径
     * @return
     * @throws IOException
     */
    public String startPySparkApplication(String appArgs, String appResourcePath) throws IOException {
        Exchanger<String> exchanger = new Exchanger<>();
        String applicationId = "";
        AtomicInteger flag = new AtomicInteger(1);
        String[] argArray = appArgs.split(" ");
        for (int i = 0; i < argArray.length; i++) {
            argArray[i] = UnicodeUtil.toUnicode(argArray[i]);
        }
        if (!(appResourcePath.startsWith("/") || appResourcePath.startsWith("hdfs:"))) {
            appResourcePath = String
                .format("%s/%s", sparkLauncherConfig.getScriptFolder(), appResourcePath);
        }
        SparkAppHandle handler = new SparkLauncher()
            .setAppName(argArray[0])
            .setSparkHome(sparkLauncherConfig.getSparkHome())
            .setMaster(sparkLauncherConfig.getMaster())
            .setAppResource(appResourcePath)
            .setConf("spark.driver.memory", sparkLauncherConfig.getDriverMem())
            .setConf("spark.executor.memory", sparkLauncherConfig.getExecutorMemory())
            .setConf("spark.executor.cores", sparkLauncherConfig.getExecutorCores())
            .addSparkArg("--py-files", sparkLauncherConfig.getPyFiles())
            .addAppArgs(argArray)
            .setDeployMode(sparkLauncherConfig.getDeployMode())
            .startApplication(new SparkAppHandle.Listener() {
                @Override
                public void stateChanged(SparkAppHandle handle) {
                    if (flag.get() == 1) {
                        if (StringUtils.isNotEmpty(handle.getAppId())) {
                            try {
                                exchanger.exchange(handle.getAppId(), 300, TimeUnit.MILLISECONDS);
                                flag.set(0);
                            } catch (Exception e) {
                                logger.warn("JobService -> startPySparkApplication failed, since {}", e.getMessage());
                            }
                        } else {
                            //when sql script is empty, logic will flows here
                            logger.info("handle.getAppId() is null, handle.getAppId()={}",
                                handle.getAppId());
                            JSONObject json = new JSONObject();
                            json.put("id", "");
                            json.put("finalStatus", "SUCCEEDED");
                            json.put("progress", 1.0f);
                            json.put("diagnostics", "");
                            sparkJobServerRestUtil.notifyFinalStatus(json);

                        }
                    } else {
                        logger.info("else block handle.getState(), handle.getState()={}",
                            handle.getState());
                        if (handle.getState().isFinal()) {
                            callbackDataScience(handle.getAppId(), appArgs);
                        } else {
                            logger.info(" handle.getState() not final, handle.getState()={}",
                                handle.getState());
                        }
                    }

                }

                @Override
                public void infoChanged(SparkAppHandle handle) {
                }
            });

        try {
            applicationId = exchanger.exchange("");
        } catch (InterruptedException e) {
            logger.warn("JobService -> startPySparkApplication failed, since {}", e.getMessage());
            e.printStackTrace();
        }
        return applicationId;
    }


    /**
     * 提交pyspark任务
     *
     * @param appArgs 命令行参数
     * @return
     * @throws IOException
     */
    public String startMLApplication(String appArgs) throws IOException {
        Exchanger<String> exchanger = new Exchanger<>();
        String applicationId = "";
        AtomicInteger flag = new AtomicInteger(1);
        String[] argArray = appArgs.split(" ");
        for (int i = 0; i < argArray.length; i++) {
            argArray[i] = UnicodeUtil.toUnicode(argArray[i]);
        }
        SparkAppHandle handler = new SparkLauncher()
            .setSparkHome(sparkLauncherConfig.getSparkHome())
            .setConf("spark.driver.memory", sparkLauncherConfig.getDriverMem())
            .setConf("spark.executor.memory", sparkLauncherConfig.getExecutorMemory())
            .setConf("spark.executor.cores", sparkLauncherConfig.getExecutorCores())
            .setAppResource(sparkLauncherConfig.getAiworkspyMain())
            .addAppArgs(argArray)
            .startApplication(new SparkAppHandle.Listener() {
                @Override
                public void stateChanged(SparkAppHandle handle) {
                    if (flag.get() == 1) {
                        if (StringUtils.isNotEmpty(handle.getAppId())) {
                            try {
                                exchanger.exchange(handle.getAppId(), 300, TimeUnit.MILLISECONDS);
                                flag.set(0);
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                        } else {
                            //when sql script is empty, logic will flows here
                            logger.info("handle.getAppId() is null, handle.getAppId()={}", handle.getAppId());
                            JSONObject json = new JSONObject();
                            json.put("id", "");
                            json.put("finalStatus", "SUCCEEDED");
                            json.put("progress", 1.0f);
                            json.put("diagnostics", "");
                            sparkJobServerRestUtil.notifyFinalStatus(json);
                        }
                    } else {
                        logger.info("else block handle.getState(), handle.getState()={}", handle.getState());
                        if (handle.getState().isFinal()) {
                            callbackDataScience(handle.getAppId(), appArgs);
                        } else {
                            logger.info(" handle.getState() not final, handle.getState()={}", handle.getState());
                        }
                    }
                }
                @Override
                public void infoChanged(SparkAppHandle handle) {
                }
            });

        try {
            applicationId = exchanger.exchange("");
        } catch (InterruptedException e) {
            logger.warn("JobService -> startMLApplication failed, since {}", e.getMessage());
            e.printStackTrace();
        }
        return applicationId;
    }


    /**
     * 调用dataScience，通知最终结果
     *
     * @param appId
     */
    public void callbackDataScience(String appId, String... appArgs) {
        logger.info("Job:{} is finished, send callback...", appId);
        try {
            ApplicationReport report = queryJobStatus(appId, null);
            JSONObject json = new JSONObject();
            json.put("id", appId);
            json.put("finalStatus", report.getFinalApplicationStatus().toString());
            json.put("progress", report.getProgress());
            json.put("diagnostics", report.getDiagnostics());
            logger.info(String.valueOf(json));
            sparkJobServerRestUtil.notifyFinalStatus(json);

        } catch (Exception e) {
            //通知出异常,不继续通知。由dataScience主动查询状态
            logger.error("jobService -> callbackDataScience failed,appId={} message={}", appId, e.getMessage());
        }
    }

    public ApplicationId covertApplicationId(String applicationIdStr) {
        String[] s = applicationIdStr.split("_");
        long clusterTimestamp = Long.valueOf(s[1]);
        int id = Integer.valueOf(s[2]);
        return ApplicationId.newInstance(clusterTimestamp, id);
    }

    public void printLog(String applicationId, PrintStream ps, Integer page) throws IOException {
        Configuration conf = new YarnConfiguration();
        conf.addResource(new Path(yarnHome + "/core-site.xml"));
        conf.addResource(new Path(yarnHome + "/yarn-site.xml"));
        conf.addResource(new Path(yarnHome + "/hdfs-site.xml"));

        ApplicationId appId = ConverterUtils.toApplicationId(applicationId);

        Path remoteRootLogDir = new Path(
            conf.get("yarn.nodemanager.remote-app-log-dir", "/tmp/logs"));
        //只能读取到当前linux用户提交任务的日志
        String user = UserGroupInformation.getCurrentUser().getShortUserName();
        String logDirSuffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf);

        Path remoteAppLogDir = LogAggregationUtils
            .getRemoteAppLogDir(remoteRootLogDir, appId, user, logDirSuffix);
        RemoteIterator<FileStatus> nodeFiles;
        try {
            Path qualifiedLogDir = FileContext.getFileContext(conf).makeQualified(remoteAppLogDir);
            nodeFiles = FileContext.getFileContext(qualifiedLogDir.toUri(), conf)
                .listStatus(remoteAppLogDir);
        } catch (FileNotFoundException e) {
            logger.error("jobService -> printLog failed,appId={} message={}", appId, e.getMessage());
            ps.println("Log file not exist");
            return;
        }

        boolean foundAnyLogs = false;
        int index = 1;
        while (nodeFiles.hasNext()) {
            if (page != null && page > 0 && page != index) {
                index++;
                nodeFiles.next();
                continue;
            }
            FileStatus thisNodeFile = nodeFiles.next();
            if (!thisNodeFile.getPath().getName().endsWith(".tmp")) {
                AggregatedLogFormat.LogReader reader = new AggregatedLogFormat.LogReader(conf,
                    thisNodeFile.getPath());
                try {
                    AggregatedLogFormat.LogKey key = new AggregatedLogFormat.LogKey();
                    DataInputStream valueStream = reader.next(key);
                    for (; ; ) {
                        if (valueStream != null) {
                            try {
                                for (; ; ) {
                                    AggregatedLogFormat.LogReader.readAContainerLogsForALogType(valueStream, ps,
                                            thisNodeFile.getModificationTime());
                                    foundAnyLogs = true;
                                }
                            } catch (EOFException eof) {
                                key = new AggregatedLogFormat.LogKey();
                                valueStream = reader.next(key);
                            }

                        } else {
                            break;
                        }
                    }
                } finally {
                    reader.close();
                }
            }
            index++;
        }
        if (!foundAnyLogs) {
            ps.println("Log ending");
            return;
        }
    }

    public JSONObject queryApps(JSONObject req) {
        String states = req.getString("states");
        String startedTimeBegin = req.getString("startedTimeBegin");
        String startedTimeEnd = req.getString("startedTimeEnd");
        String finishedTimeBegin = req.getString("finishedTimeBegin");
        String finishedTimeEnd = req.getString("finishedTimeEnd");
        if (StringUtils.isEmpty(states)) {
            req.put("states", "NEW,NEW_SAVING,SUBMITTED,ACCEPTED,RUNNING");
        }
        if (StringUtils.isNotEmpty(startedTimeBegin)) {
            req.put("startedTimeBegin", TimeUtil.convertDateToTimeStamp(startedTimeBegin));
        }
        if (StringUtils.isNotEmpty(startedTimeEnd)) {
            req.put("startedTimeEnd", TimeUtil.convertDateToTimeStamp(startedTimeEnd));
        }
        if (StringUtils.isNotEmpty(finishedTimeBegin)) {
            req.put("finishedTimeBegin", TimeUtil.convertDateToTimeStamp(finishedTimeBegin));
        }
        if (StringUtils.isNotEmpty(finishedTimeEnd)) {
            req.put("finishedTimeEnd", TimeUtil.convertDateToTimeStamp(finishedTimeEnd));
        }
        String args = Joiner.on("&").useForNull("").withKeyValueSeparator('=').join(req);
        return sparkJobServerRestUtil.queryApps(args);
    }

    /**
     * 读取运行成功算子的输出元数据
     *
     * @param directory
     * @return
     */
    public String queryRuntimeMeta(String directory) {
        FileSystem fs = fileSystemHolder.getFileSystem();
        Path path = new Path(String.format(RUN_META_PATH, directory));
        FSDataInputStream open = null;
        try {
            open = fs.open(path);
            String s = IOUtils.toString(open);
            return s;
        } catch (IOException e) {
            logger.error("jobService -> queryRuntimeMeta failed, message={}", e.getMessage());
        } finally {
            if (null != open) {
                try {
                    open.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
        return "";
    }

    /**
     * 写python脚本到hdfs路径中
     *
     * @return
     */
    public boolean writePyScriptToHDFS(JSONObject params) {
        String appResourcePath = params.getString("appResourcePath");
        String scriptBody = params.getString("scriptBody");

        String fullPath = String
            .format("%s%s", sparkLauncherConfig.getScriptFolder(), appResourcePath);
        logger.info("fullpath_write");
        logger.info(fullPath);
        FileSystem fs = fileSystemHolder.getFileSystem();
        logger.info("writeScript={}", scriptBody);
        try {
            FSDataOutputStream fsDataOutputStream = fs.create(new Path(fullPath), true);
            fsDataOutputStream.write(scriptBody.getBytes(StandardCharsets.UTF_8));
            fsDataOutputStream.flush();
            fsDataOutputStream.close();
        } catch (Exception e) {
            logger.error("jobService -> writePyScriptToHDFS failed, message={}", e.getMessage());
            return false;
        }
        return true;
    }

    /**
     * 提交任务至Spark JobServer
     *
     * @param appArgs
     * @return
     */
    public String submitToJobServer(String appArgs) {
        String inputString = UnicodeUtil.toUnicode(appArgs);
        String contextName = "";
        String result = "";
        try {
            contextName = contextPool.take();
            logger.info("enter submitToJobServer");
            logger.info(contextName);
            inputString = "\"" + inputString + "\"";
            result = sparkJobServerRestUtil.submitToJobServer(inputString, contextName);
        } catch (Exception e) {
            logger.error("jobService -> submitToJobServer failed, message={}", e.getMessage());
        } finally {
            contextPool.add(contextName);
        }
        return result;
    }

    /**
     * 创建jobServer上下文
     *
     * @param contextName
     * @return
     */
    public String createContext(String contextName) {
        logger.info(contextName);
        try {
            return sparkJobServerRestUtil.createContext(contextName);
        } catch (RestClientException e) {
            logger.error("jobService -> createContext failed, message={}", e.getMessage());
        }
        return "";
    }

    /**
     * 停止jobServer上的context
     *
     * @param contextName
     * @return
     */
    public String deleteContext(String contextName) {
        try {
            return sparkJobServerRestUtil.deleteContext(contextName);
        } catch (RestClientException e) {
            logger.error("jobService -> deleteContext failed, message={}", e.getMessage());
        }
        return "";
    }
}
